/*
 * Copyright (c) 2020, 2023, Huawei Technologies Co., Ltd. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

#ifndef SHARE_JBOOSTER_NET_COMMUNICATIONSTREAM_HPP
#define SHARE_JBOOSTER_NET_COMMUNICATIONSTREAM_HPP

#include "jbooster/net/message.hpp"
#include "jbooster/net/netCommon.hpp"
#include "memory/allocation.hpp"
#include "utilities/globalDefinitions.hpp"

class Thread;
typedef struct ssl_st SSL;
typedef struct ssl_ctx_st SSL_CTX;

/**
 * Base class of ServerStream and ClientStream.
 * We never use CommunicationStream directly. Instead, we specify ServerStream or ClientStream
 * each time. So there's no need to define any method as vitrual.
 */
class CommunicationStream: public CHeapObj<mtJBooster> {
private:
  int _conn_fd;                 // generated by the OS
  SSL* _ssl;
  uint32_t _stream_id;          // generated by the server
  int _errno;

  Message _msg_recv, _msg_send;

  bool _can_close_safely;

  // only to ensure that it's used by only one thread
  Thread* _cur_thread;

private:
  void handle_net_err(int comm_size, bool is_read);

  void set_errno(int eno) { _errno = eno; }
  int get_errno() { return _errno; }
  int get_and_clear_errno() { int eno = _errno; _errno = 0; return eno; }
  int return_errno_or_flag(int flag) { return get_errno() ? get_and_clear_errno() : flag; }
  int read_from_fd_or_ssl(char* buf, size_t size);
  int write_to_fd_or_ssl(char* buf, size_t size);

  uint32_t read_once_from_stream(char* buf, uint32_t size);
  uint32_t read_all_from_stream(char* buf, uint32_t size);
  uint32_t write_once_to_stream(char* buf, uint32_t size);
  uint32_t write_all_to_stream(char* buf, uint32_t size);

  bool check_received_message_type(MessageType expected);
  bool check_received_message_size();

  void assert_current_thread() NOT_DEBUG_RETURN;
  void assert_in_native() NOT_DEBUG_RETURN;

protected:
  CommunicationStream(Thread* thread):
          _conn_fd(-1),
          _ssl(nullptr),
          _stream_id(0),
          _errno(0),
          _msg_recv(SerializationMode::DESERIALIZE, this),
          _msg_send(SerializationMode::SERIALIZE, this),
          _can_close_safely(false),
          _cur_thread(thread) {}

  virtual ~CommunicationStream() { close_stream(); }

  void init_stream(int conn_fd, SSL* ssl) {
    _conn_fd = conn_fd;
    _ssl = ssl;
  }

  void close_stream();

  int recv_message();
  int send_message();

  uint32_t* stream_id_addr() { return &_stream_id; }
  void set_stream_id(uint32_t stream_id) { _stream_id = stream_id; }

  void set_can_close_safely() { _can_close_safely = true; }

public:
  bool is_stream_closed() { return _conn_fd < 0; }
  bool can_close_safely() { return _can_close_safely; }

  int conn_fd() { return _conn_fd; }
  uint32_t stream_id() { return _stream_id; }

  const Message& msg_recv() const { return _msg_recv; }
  const Message& msg_send() const { return _msg_send; }

  // Usage of the following APIs:
  // (Both the client and server can act as the requester and responder.)
  //
  // Requester:  send_request()                       recv_response()
  //                   |                                     A
  //                   V                                     |
  // Responder:  recv_request() -> parse_request() -> send_response()
  //
  // Or, of course, more generally:
  //
  // Side A:  send_request()                       recv_request() -> parse_request()
  //                |                                    A
  //                V                                    |
  // Side B:  recv_request() -> parse_request() -> send_request()
  //
  // @return: 0: fine; other values: error code
  //
  // **Note**: These operations (except parse_request()) may block the thread until
  //           JBoosterTimeout. So it's better not to invoke them in _thread_in_vm
  //           state of JavaThread, or the time to reach safepoints may be severely
  //           affected.
  //           @see ThreadToNativeFromVM, ThreadBlockInVM

  template <typename... Args>
  int send_request(MessageType type, const Args* const... args);

  int recv_request(MessageType& type);

  template <typename... Args>
  int recv_request(MessageType type, Args* const&... args);

  template <typename... Args>
  int parse_request(Args* const&... args);

  template <typename... Args>
  int send_response(const Args* const... args);

  template <typename... Args>
  int recv_response(Args* const&... args);

  MessageType recv_msg_type() { return _msg_recv.msg_type(); }

  Thread* current_thread() { return _cur_thread; }
  void set_current_thread(Thread* thread) { _cur_thread = thread; }

  bool set_read_write_timeout(uint32_t timeout_ms);
};

#endif // SHARE_JBOOSTER_NET_COMMUNICATIONSTREAM_HPP
